package com.bigdata.graph

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag

object Util {

  def readUndirectDataFromHDFS(sc: SparkContext, filePath: String, split: String, partition: Int): RDD[(String, String)] = {
    sc.textFile(filePath, partition)
      .flatMap(line => {
        if (line.startsWith("#")) {
          Iterator.empty
        } else {
          val x = line.split(split)
          if (x.length < 2) {
            Iterator.empty
          } else {
            val node1 = x(0)
            val node2 = x(1)
            Iterator((node1, node2))
          }
        }
      })
  }

  def readCommFromHDFS(sc: SparkContext, filePath: String, split: String, partition: Int): RDD[(Int, Int)] = {
    sc.textFile(filePath, partition)
      .flatMap(line => {
        if (line.startsWith("#")) {
          Iterator.empty
        } else {
          var mLine = line
          if (mLine.contains("(") || mLine.contains(")")) {
            mLine = mLine.replaceAll("\\(", "")
            mLine = mLine.replaceAll("\\)", "")
          }
          val x = mLine.split(split)
          if (x.length < 2) {
            Iterator.empty
          }
          else {
            Iterator((x(0).toInt, x(1).toInt))
          }
        }
      })
  }

  def readGraphFromHDFS(sc: SparkContext, filePath: String, split: String, isWeighted: Boolean, partition: Int): RDD[(Int, Int, Double)] = {
    sc.textFile(filePath, partition)
      .flatMap(line => {
        if (line.startsWith("#")) {
          Iterator.empty
        } else {
          val x = line.split(split)
          if (x.length < 2) {
            Iterator.empty
          } else {
            var w = 1.0
            if (isWeighted) {
              w = x(2).toDouble
            }
            Iterator((x(0).toInt, x(1).toInt, w))
          }
        }
      })
  }

  def saveDataToHDFS[T: ClassTag](data: RDD[T], filePath: String): Unit = {
    data.saveAsTextFile(filePath)
  }

  def saveDataToHDFS[T: ClassTag, V: ClassTag](data: RDD[(T, V)], split: String, filePath: String): Unit = {
    data.map(f => f._1 + split + f._2).saveAsTextFile(filePath)
  }

}
